package com.blog.spark.job

import com.blog.spark.utils.AccessConvertUtils
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
  * @description 第二步清洗：使用spark完成我们的数据清洗，第一步已经合并
  * @author yuyon26@126.com
  * @date 2018/10/5 17:51
  */
object SparkStatCleanJob {

  def main(args: Array[String]): Unit = {

    val inputPath="D:\\spark\\access.log"
    val outputPath="D:\\spark\\clean2"

    val sparkSession = SparkSession.builder().appName("SparkStatCleanJob").master("local[2]")
      .config("spark.sql.parquet.compression.codec","gzip")
      .getOrCreate()

    val accessRDD = sparkSession.sparkContext.textFile(inputPath).filter(x => !x.contains("\\x"))

    import sparkSession.implicits._
    //accessRDD.take(10).foreach(println)
    //RDD=>DF
    val rowRDD=accessRDD.map(x => AccessConvertUtils.parseLog(x)).filter(row => !row.anyNull);
    val accessDF = sparkSession createDataFrame(rowRDD,AccessConvertUtils.struct)

    //accessDF.printSchema()

    //accessDF.show(false)   .partitionBy("day")

    accessDF.distinct()
      .coalesce(1).write.format("parquet").mode(SaveMode.Overwrite)
      .save(outputPath)

    sparkSession.stop()
  }

}
